使用虚拟线程进行网络 I/O - 幕后
Chris Hegarty 于 2021 年 5 月 10 日Project Loom 旨在提供 Java VM 功能和 API,以支持 Java 平台上易于使用、高吞吐量的轻量级并发和新的编程模型。这带来了许多有趣和令人兴奋的前景,其中之一是简化与网络交互的代码。如今,服务器可以处理的打开套接字连接数量远远超过它们可以支持的线程数量,这既带来了机遇,也带来了挑战。
不幸的是,编写与网络交互的可扩展代码很困难。当使用同步 API 时,存在一个阈值,超过该阈值,同步 API 无法扩展,因为这些 API 在执行 I/O 操作时可能会阻塞,这反过来会占用一个线程,直到操作就绪,例如,当尝试从套接字读取数据时,当前没有可用数据。线程(目前)是 Java 平台中一种昂贵的资源,在等待 I/O 操作完成时,占用线程的成本太高。为了解决此限制,我们通常会使用异步 I/O 或反应式框架,因为它们可以用于构建不会导致线程在 I/O 操作中被占用的代码,而是使用回调或事件通知来分别在 I/O 操作完成或就绪时通知。
异步和非阻塞 API 比同步 API 更难使用(部分原因是它们会导致对人类来说不自然的代码结构)。同步 API 在大多数情况下更容易使用;代码更容易编写、更容易阅读和更容易调试(具有有意义的堆栈跟踪!)。但是,如前所述,使用同步 API 的代码的扩展性不如异步变体,因此这让我们面临一个糟糕的选择 - 选择更直接的同步代码并接受它无法扩展,或者选择更可扩展的异步代码并处理其所有复杂性。这不是一个好的选择!Project Loom 的一个引人注目的价值主张是避免做出这种选择 - 同步代码应该能够扩展。
在本文中,我们将探讨在虚拟线程上调用时,Java 平台的网络 API 在幕后如何工作。这些细节在很大程度上是实现的产物,在编写顶层代码时无需了解,但了解幕后工作原理仍然很有趣,并且可以帮助回答一些问题,如果这些问题没有得到解答,可能会导致不得不做出那个艰难的选择。
虚拟线程
在继续之前,我们需要了解一下 Project Loom 中的新型线程 - 虚拟线程。
虚拟线程是用户模式线程,由 Java 虚拟机而不是操作系统调度。虚拟线程需要的资源很少,单个 Java 虚拟机可以支持数百万个虚拟线程。虚拟线程非常适合执行大部分时间处于阻塞状态的任务,通常等待 I/O 操作完成。
平台线程(我们在当前版本的 Java 平台中熟悉的线程)通常与操作系统调度的内核线程一一映射。平台线程通常具有由操作系统维护的大型堆栈和其他资源。
虚拟线程通常使用一组小的平台线程作为承载线程。在虚拟线程中执行的代码通常不会意识到底层的承载线程。锁定和 I/O 操作是调度点,在这些点上,承载线程从一个虚拟线程重新调度到另一个虚拟线程。虚拟线程可以被停放,这会禁用它进行调度。停放的虚拟线程可以被取消停放,这会重新启用它进行调度。
网络 API
在 Java 平台中,网络 API 有两大类
-
异步 -
AsynchronousServerSocketChannel
、AsynchronousSocketChannel
-
同步 - java.net
Socket
/ServerSocket
/DatagramSocket
、java.nio.channelsSocketChannel
/ServerSocketChannel
/DatagramChannel
第一类,异步,启动 I/O 操作,这些操作在稍后某个时间完成,可能在与启动 I/O 操作的线程不同的线程上完成。根据定义,这些 API 不会导致阻塞系统调用,因此在虚拟线程中运行时不需要特殊处理。
第二类,同步,从它们在虚拟线程中运行时的行为来看,更有趣。在此类别中,NIO 通道可以配置为非阻塞模式。这些通道通常注册到 I/O 事件通知机制(如 Selector
),并且不执行阻塞系统调用。与异步网络 API 类似,这些 API 在虚拟线程中运行时不需要特殊处理,因为 I/O 操作本身不会调用阻塞系统调用,这通常留给选择器。因此,这留下了java.net 套接字类型和以阻塞模式配置的 NIO 通道。让我们看看它们在虚拟线程中的行为。
同步 API 的语义要求 I/O 操作一旦启动,必须在调用线程中完成或失败,然后才能将控制权返回给调用者。但是,如果 I/O 操作“未就绪”,例如,套接字上没有数据可读怎么办?
同步阻塞 API
同步网络 Java API 在虚拟线程中运行时,会将底层原生套接字切换到非阻塞模式。当从 Java 代码调用的 I/O 操作没有立即完成(原生套接字返回 EAGAIN - “未就绪”/“将阻塞”)时,底层原生套接字将注册到 JVM 范围的事件通知机制(一个 Poller),并且虚拟线程被停放。当底层 I/O 操作就绪(事件到达 Poller)时,虚拟线程被取消停放,并且底层套接字操作被重试。
让我们通过一个示例更详细地了解一下。retrieveURLs
方法下载并返回给定 URL 的响应。
// Tuple of URL and response bytes
record URLData (URL url, byte[] response) { }
List<URLData> retrieveURLs(URL... urls) throws Exception {
try (var executor = Executors.newVirtualThreadExecutor()) {
var tasks = Arrays.stream(urls)
.map(url -> (Callable<URLData>)() -> getURL(url))
.toList();
return executor.submit(tasks)
.filter(Future::isCompletedNormally)
.map(Future::join)
.toList();
}
}
retrieveURLs
方法创建任务列表(每个 URL 一个任务),并将它们提交给执行器,然后等待结果。执行器为每个任务启动一个新的虚拟线程,该线程调用 getURL
。为简单起见,只返回成功完成的任务。
getURL
方法非常简单,使用同步 URLConnection
API 获取响应。
URLData getURL(URL url) throws IOException {
try (InputStream in = url.openStream()) {
return new URLData(url, in.readAllBytes());
}
}
readAllBytes
方法是一个批量同步读取操作,它读取所有响应字节。在幕后,readAllBytes
最终会归结为java.net 套接字输入流的 read
方法。
如果我们运行一个使用 retrieveURLs
下载 HTTP URL 的小程序,其中 HTTP 服务器没有提供完整的响应,我们可以检查线程的状态,如下所示
$ java Main & echo $!
89215
$ jcmd 89215 JavaThread.dump threads.txt
Created /Users/chegar/threads.txt
在threads.txt 中,我们看到了通常的系统线程,以及测试程序的主线程和在读取操作中被阻塞的虚拟线程。注意:虚拟线程没有名称,除非显式分配一个名称,因此为未命名。
$ cat threads.txt
...
"<unnamed>" #15 virtual
java.base/java.lang.Continuation.yield(Continuation.java:402)
java.base/java.lang.VirtualThread.yieldContinuation(VirtualThread.java:367)
java.base/java.lang.VirtualThread.park(VirtualThread.java:534)
java.base/java.lang.System$2.parkVirtualThread(System.java:2370)
java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:60)
java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:184)
java.base/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:212)
java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:320)
java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:356)
java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:807)
java.base/java.net.Socket$SocketInputStream.read(Socket.java:988)
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:255)
java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:310)
java.base/java.io.BufferedInputStream.lockedRead(BufferedInputStream.java:382)
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:361)
java.base/sun.net.www.MeteredStream.read(MeteredStream.java:141)
java.base/java.io.FilterInputStream.read(FilterInputStream.java:132)
java.base/sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3648)
java.base/java.io.InputStream.readNBytes(InputStream.java:409)
java.base/java.io.InputStream.readAllBytes(InputStream.java:346)
Main.getURL(Main.java:24)
Main.lambda$retrieveURLs$0(Main.java:13)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:268)
java.base/java.util.concurrent.ThreadExecutor$TaskRunner.run(ThreadExecutor.java:385)
java.base/java.lang.VirtualThread.run(VirtualThread.java:295)
java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:172)
java.base/java.lang.Continuation.enter0(Continuation.java:372)
java.base/java.lang.Continuation.enter(Continuation.java:365)
从下往上看堆栈帧;首先,我们看到一些与设置虚拟线程相关的帧(“延续”是虚拟线程内部使用的 VM 机制),这些帧对应于由执行器服务创建的新线程。其次,我们看到几个帧,这些帧对应于测试程序调用 retrieveURLs
和 getURL
。第三,我们看到一些帧,这些帧对应于 HTTP 协议处理程序,最终对应于套接字输入流实现的 read
方法。最后,沿着这些帧向上查看堆栈,我们可以看到虚拟线程已被停放,这正是我们所期望的,因为服务器没有发送完整的响应,因此没有足够的数据可从套接字读取。但是,如果/当数据到达套接字时,是什么取消停放了虚拟线程?
更仔细地查看threads.txt 中的其他系统线程,我们看到
"Read-Poller" #16
java.base@17-internal/sun.nio.ch.KQueue.poll(Native Method)
java.base@17-internal/sun.nio.ch.KQueuePoller.poll(KQueuePoller.java:65)
java.base@17-internal/sun.nio.ch.Poller.poll(Poller.java:195)
java.base@17-internal/sun.nio.ch.Poller.lambda$startPollerThread$0(Poller.java:65)
java.base@17-internal/sun.nio.ch.Poller$$Lambda$14/0x00000008010579c0.run(Unknown Source)
java.base@17-internal/java.lang.Thread.run(Thread.java:1522)
java.base@17-internal/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:161)
此线程是 JVM 范围的读取轮询器。它的核心是一个基本事件循环,它监控所有在虚拟线程中调用时没有立即就绪的同步网络读取、连接和接受操作。当 I/O 操作就绪时,轮询器将收到通知,并随后取消停放相应的停放的虚拟线程。有一个等效的写入轮询器,用于写入操作。
上面的堆栈跟踪是在 macOS 上运行测试程序时捕获的,这就是为什么我们看到与 macOS 上轮询器实现相关的堆栈帧的原因,即 kqueue。在 Linux 上,轮询器使用 epoll,在 Windows 上使用 wepoll(它在 Winsock 的辅助功能驱动程序上提供类似 epoll 的 API)。
轮询器维护一个文件描述符到虚拟线程的映射。当文件描述符注册到轮询器时,将为该文件描述符添加一个条目,以及注册线程作为其值。轮询器的事件循环在被事件唤醒时,使用事件的文件描述符查找相应的虚拟线程并取消停放它。
扩展性
如果仔细观察,上面的行为与当前使用 NIO 通道和选择器的可扩展代码并没有太大区别 - 这些代码可以在许多服务器端框架和库中找到。虚拟线程的不同之处在于向开发人员公开的编程模型。前者公开了一个更复杂的模型,用户代码必须实现事件循环并在 I/O 边界上维护应用程序逻辑,而后者公开了一个更简单、更直接的编程模型,其中 Java 平台处理任务的调度和跨 I/O 边界的上下文维护。
用于调度虚拟线程的默认调度器是 fork-join 工作窃取调度器,它非常适合这项工作。用于监控就绪 I/O 操作的原生事件通知机制是操作系统提供的最现代、最有效的机制。虚拟线程构建在 Java VM 中的延续支持之上。因此,同步 Java 网络 API 应该与更复杂的异步和非阻塞代码结构具有可比的扩展性。
结论
同步 Java 网络 API 已由 JEP 353 和 JEP 373 重新实现,为 Project Loom 做准备。当在虚拟线程中运行时,未立即完成的 I/O 操作将导致虚拟线程被挂起。当 I/O “就绪” 时,虚拟线程将被解除挂起。该实现使用 Java VM 和核心库中的多个功能,提供了一种可扩展且高效的替代方案,与当前的异步和非阻塞代码结构相比具有优势。